-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sub-transaction positioning #2035
base: master
Are you sure you want to change the base?
Conversation
When we end up crashing for reasons like we got throttled by the producer, this will allow us to continue to make progress.
@@ -69,8 +79,14 @@ public int hashCode() { | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a bug. We should ensure binlogPosition + txOffset is unique for HashMap.
Shall we fix like this: return Objects.hash(binlogPosition,this.txOffset);
All this mechanism allows maxwell to stop and restart in the middle of very large | ||
transactions. | ||
*/ | ||
Position nextPosition = Position.valueOf(lastTableMapPosition, lastHeartbeatRead, 0L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we fix like this, prevent it from some nullpoint exception
Position nextPosition = null;
if(lastTableMapPosition != null){
nextPosition = Position.valueOf(lastTableMapPosition, lastHeartbeatRead, 0L);
}else{
nextPosition = Position.valueOf(this.nextPosition, lastHeartbeatRead, 0L);
}
for ( RowMap r : rows ) | ||
if (shouldOutputRowMap(table.getDatabase(), table.getName(), r, filter)) { | ||
buffer.add(r); | ||
for ( RowMap r : rows ) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should prevent this logic from impacting rows having different positions, not applying any tx_offset logic:
if(rows != null){
for ( int i = 0; i < rows.size(); i++) {
RowMap r = rows.get(i);
if(i > 0 && rows.get(i-1).getPosition().equals(r.getPosition())){
//only batch insert uses tx offset
r.setXoffset(++txOffset);
}else{
txOffset = 0l;
r.setXoffset(0l);
}
if ( shouldOutputRowMap(table.getDatabase(), table.getName(), r, filter) ) {
if ( startTXOffset <= r.getXoffset() ) {
buffer.add(r);
}
}
}
}
Hi @osheroff , I have tried this pr to fix this problem: #2119. |
Hi @jackjoesh So I started this work at the last company I worked for and yeah, never quite completed it. I'm happy to work with you to try to push it across the finish line but I think we need some really really good integration tests -- something where an integration test can terminate maxwell in the middle of a transaction and verify that it will properly restart. |
allows maxwell to set the last-good binlog position somewhere inside of an event; this allows us to make progress in the face of periodic crashes or OOM errors from giant txs, stuff like that.